package com.kaigejava.studynote.queue;

import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.util.StringUtils;

import java.util.Date;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @BelongsProject: kaigejavastudy
 * @BelongsPackage: com.kaigejava.studynote.queue
 * @Author: kaigejava
 * @CreateTime: 2023-12-06  14:33
 * @Description: ArrayBlockingQueue队列测试类
 * @Version: 1.0
 */
@Slf4j
public class ArrayBlockingQueueTest {

    /**
     * 内存队列对象
     */
    private final Queue<String> gatewayDatasToReport = new ArrayBlockingQueue<>(60);

    /**
     * 线程池
     */
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(24, new DefaultThreadFactory("abq-"));


    @Test
    public void mainTest() throws InterruptedException {
        //模拟消费者
        initGateReportService(5);

        for (int i = 0; i < 10; i++) {
            //模拟生产者
            Long timestamp = new Date().getTime();
            gatewayDataReport(timestamp, "这是消息:" + (i + 1));
        }

        TimeUnit.SECONDS.sleep(50);


    }

    public void gatewayDataReport(Long timestamp, String msg) {
        if (StringUtils.isEmpty(msg)) {
            System.out.println("消息为空,不处理");
            return;
        }

        synchronized (gatewayDatasToReport) {
            //将消息投放到队列中
            gatewayDatasToReport.offer("当前时间是：【" + timestamp + "】.获取到的消息内容为:" + msg);
            gatewayDatasToReport.notifyAll();
        }
    }


    /**
     * 消费者方法
     *
     * @param delay sleep时间
     */
    private void initGateReportService(final int delay) {
        scheduledExecutorService.submit(() -> {
            while (true) {
                try {
                    synchronized (gatewayDatasToReport) {
                        while (gatewayDatasToReport.isEmpty()) {
                            gatewayDatasToReport.wait(3);
                        }
                        sleep(delay);
                        reportGatewayPropertyUntilEmpty();
                        // 通知生产者
                        gatewayDatasToReport.notifyAll();
                    }
                } catch (Exception e) {
                    if (e instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                    log.error("网关数据上报异常", e);
                }
            }
        });
    }


    /**
     * 消费等待上数据，直到消费完
     */
    private void reportGatewayPropertyUntilEmpty() {
        String msg;
        while (!gatewayDatasToReport.isEmpty()) {
            //获取队列中的数据
            msg = gatewayDatasToReport.remove();
            System.out.println("从队列中获取到的数据是：" + msg);
        }
    }

    private void sleep(int second) {
        try {
            TimeUnit.SECONDS.sleep(second);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
